Skip to content

feat: add subscriptions hook#1243

Closed
alepane21 wants to merge 47 commits intotopic/streams-v1from
ale/eng-7600-add-subscriptiononstarthandler
Closed

feat: add subscriptions hook#1243
alepane21 wants to merge 47 commits intotopic/streams-v1from
ale/eng-7600-add-subscriptiononstarthandler

Conversation

@alepane21
Copy link
Copy Markdown
Contributor

@alepane21 alepane21 commented Jul 18, 2025

The demo_mode will serve a demo execution config if is not found elsewhere (controlplane, static execution config).
In this PR the flag is improved so that, when the demo_mode is enabled, the router can start also if no graph token is set.

Summary by CodeRabbit

  • New Features

    • Added pre-subscription startup hooks for subscriptions to emit initial events or abort startup.
    • Enhanced subscription context with a callable update emitter so hooks can push updates into the subscription stream.
  • Tests

    • Added tests validating hook invocation, error propagation from hooks, and update emission behavior (including heavy and concurrent emission scenarios).

Checklist

  • I have discussed my proposed changes in an issue and have received approval to proceed.
  • I have followed the coding standards of the project.
  • Tests or benchmarks have been added or updated.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jul 18, 2025

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

Introduced a subscription start hook mechanism: new hook type and config field, plumbing through GraphQL subscription source, resolve layer invocation that can emit updates via Context, a HookableSubscriptionDataSource interface, Context emit API, and accompanying unit tests exercising hook invocation and error propagation.

Changes

Cohort / File(s) Change Summary
Configuration
v2/pkg/engine/datasource/graphql_datasource/configuration.go
Added SubscriptionOnStartFn type and StartupHooks []SubscriptionOnStartFn on SubscriptionConfiguration.
GraphQL datasource
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go, .../graphql_datasource_test.go
SubscriptionSource gains subscriptionOnStartFns []SubscriptionOnStartFn; Planner.ConfigureSubscription wires hooks from config; added SubscriptionOnStart(ctx *resolve.Context, input []byte) error; tests updated and new TestSubscriptionSource_SubscriptionOnStart added.
Resolve context
v2/pkg/engine/resolve/context.go
Added private emitEventFn func(data []byte) to Context, new EmitSubscriptionUpdate(data []byte) method, and cleared callback in Free().
Resolve datasource interface
v2/pkg/engine/resolve/datasource.go
Added HookableSubscriptionDataSource interface with SubscriptionOnStart(ctx *Context, input []byte) (err error).
Resolve core logic
v2/pkg/engine/resolve/resolve.go
Integrated start-hook invocation into subscription startup paths: create updater, clone context with emit callback, call SubscriptionOnStart (sync/async), handle hook errors by writing error, closing trigger and cancelling; minor event struct alignment.
Resolve tests
v2/pkg/engine/resolve/resolve_test.go
Extended createFakeStream and _fakeStream to accept and forward subscriptionOnStartFn; added tests validating hook invocation, Context-based update emission, concurrency, and error handling; updated existing calls to pass nil where unchanged.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes


🧪 Early access (Sonnet 4.5): enabled

We are currently testing the Sonnet 4.5 model, which is expected to improve code review quality. However, this model may lead to increased noise levels in the review comments. Please disable the early access features if the noise level causes any inconvenience.

Note:

  • Public repositories are always opted into early access features.
  • You can enable or disable early access features from the CodeRabbit UI or by updating the CodeRabbit configuration file.

Comment @coderabbitai help to get the list of available commands and usage tips.

@alepane21
Copy link
Copy Markdown
Contributor Author

alepane21 commented Jul 19, 2025

@coderabbitai full review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jul 19, 2025

✅ Actions performed

Full review triggered.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jul 19, 2025

Tip

For best results, initiate chat on the files or code changes.

🧩 Analysis chain

I'll perform a full review of this PR. Let me gather the necessary information about the changes and analyze the code.


🏁 Script executed:

#!/bin/bash
# Get PR information
gh pr view 1243 --json title,body,files,additions,deletions,changedFiles

# Get the list of changed files
gh pr diff 1243 --name-only

# Get the full diff to understand the changes
gh pr diff 1243

Length of output: 13046


@alepane21 Here's my comprehensive review of the subscription hooks feature:

🎯 Overall Assessment

This is a well-implemented feature that adds subscription hooks to the GraphQL datasource. The code is clean, follows consistent patterns, and includes excellent test coverage.

Strengths

1. Clean API Design

  • The OnSubscriptionStartFn function signature is well-designed, accepting context and returning events with error handling
  • Integration into existing SubscriptionConfiguration is seamless

2. Robust Implementation

  • Both Start and AsyncStart methods are consistently updated
  • Proper error propagation - if any hook fails, the subscription doesn't start
  • Hook-generated events are processed before the actual subscription begins

3. Excellent Test Coverage

The test suite covers all critical scenarios:

  • Hook execution verification
  • Error handling in both sync and async modes
  • Message injection functionality
  • Integration with real subscription server

🔍 Code Quality Observations

Configuration Changes (configuration.go)

type OnSubscriptionStartFn func(ctx *resolve.Context) ([][]byte, error)
  • Good function signature design
  • Clear naming convention
  • Proper integration with existing configuration structure

Implementation (graphql_datasource.go)

for _, fn := range s.onSubscriptionStartFns {
    events, err := fn(ctx)
    if err != nil {
        return err
    }
    for _, event := range events {
        updater.Update(event)
    }
}
  • Clean iteration pattern
  • Proper error handling with early return
  • Consistent implementation across both methods

💡 Suggestions for Improvement

1. Documentation

Consider adding godoc comments for the new types:

// OnSubscriptionStartFn defines a hook function that is called when a subscription starts.
// It receives the resolve context and can return initial events or an error.
// If an error is returned, the subscription will not start.
type OnSubscriptionStartFn func(ctx *resolve.Context) ([][]byte, error)

2. Hook Execution Order

The current implementation processes hooks sequentially. Consider documenting this behavior or potentially adding support for:

  • Parallel execution (if hooks are independent)
  • Priority-based ordering
  • Hook cancellation

3. Event Validation

Consider adding validation for hook-generated events to ensure they conform to the expected GraphQL response format.

🧪 Test Quality

The tests are exemplary:

  • Comprehensive scenarios: Success, error, and message injection cases
  • Real integration: Uses actual subscription server
  • Proper assertions: Verifies both hook execution and subscription behavior
  • Good test isolation: Each test is independent and well-structured

🚀 Ready for Production

This implementation is production-ready with:

  • ✅ Proper error handling
  • ✅ Comprehensive test coverage
  • ✅ Clean API design
  • ✅ Consistent implementation patterns
  • ✅ No breaking changes to existing functionality

The feature successfully enables pre-subscription hooks while maintaining backward compatibility and following the existing codebase patterns.

🐰✨
Excellent work on this subscription hooks implementation! The attention to detail and comprehensive testing really shines through.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go (1)

8905-9071: Consider adding test coverage for additional scenarios

The test suite for onSubscriptionStartFns is well-structured, but consider adding the following test cases for better coverage:

  1. Test with multiple start functions to ensure they are all called in order
  2. Test with nil/empty onSubscriptionStartFns to ensure no panics occur
  3. Test the message emission scenario with the Start method (currently only AsyncStart is tested on lines 9053-9071)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7de2a2e and f8de362.

📒 Files selected for processing (3)
  • v2/pkg/engine/datasource/graphql_datasource/configuration.go (3 hunks)
  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (4 hunks)
  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go (2 hunks)
🧰 Additional context used
🧠 Learnings (1)
v2/pkg/engine/datasource/graphql_datasource/configuration.go (1)
Learnt from: SkArchon
PR: wundergraph/graphql-go-tools#1203
File: v2/pkg/engine/resolve/loader.go:63-67
Timestamp: 2025-07-02T15:28:02.122Z
Learning: In the graphql-go-tools codebase, result structs are consistently initialized with non-nil bytes.Buffer instances, making additional nil checks for res.out unnecessary defensive programming.
🧬 Code Graph Analysis (1)
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (1)
v2/pkg/engine/datasource/graphql_datasource/configuration.go (1)
  • OnSubscriptionStartFn (107-107)
🔇 Additional comments (7)
v2/pkg/engine/datasource/graphql_datasource/configuration.go (3)

12-12: LGTM: Import added for new functionality.

The resolve package import is necessary for the new OnSubscriptionStartFn function type definition.


107-107: LGTM: Well-designed function type for subscription hooks.

The function signature is clear and follows Go conventions. The return type ([][]byte, error) appropriately allows for multiple events to be emitted with proper error handling.


125-125: LGTM: Subscription configuration properly extended.

The new field allows multiple subscription start hooks to be configured, which aligns with the flexible design of the hook mechanism.

v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (4)

453-454: LGTM: Subscription source properly configured with hooks.

The hooks are correctly passed from the configuration to the SubscriptionSource, maintaining the chain of configuration from config to implementation.


936-937: LGTM: SubscriptionSource struct properly extended.

The new onSubscriptionStartFns field cleanly integrates with the existing structure and uses appropriate naming conventions.


949-957: LGTM: Hook invocation properly implemented.

The pre-subscription hook logic is correctly implemented with:

  • Proper error handling that aborts subscription on hook failure
  • Correct event forwarding from hooks to the subscription updater
  • Sequential execution ensuring hooks complete before subscription starts

977-985: LGTM: Consistent hook implementation across methods.

The hook invocation logic is identical to AsyncStart, ensuring consistent behavior between synchronous and asynchronous subscription starts.

@alepane21 alepane21 marked this pull request as ready for review July 19, 2025 11:47
Comment thread v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go Outdated
Comment thread v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go Outdated
Comment thread v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go Outdated
Comment thread v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go Outdated
Comment thread v2/pkg/engine/datasource/graphql_datasource/configuration.go Outdated
@alepane21 alepane21 marked this pull request as draft July 23, 2025 07:20
@alepane21
Copy link
Copy Markdown
Contributor Author

@coderabbitai full review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jul 23, 2025

✅ Actions performed

Full review triggered.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go (3)

8911-8914: Consider simplifying the channel struct for better readability.

The anonymous struct in the channel could be simplified since you're testing individual parameters separately.

Consider this approach for cleaner code:

-		startFnCalled := make(chan struct {
-			ctx   *resolve.Context
-			input []byte
-		}, 1)
+		var capturedCtx *resolve.Context
+		var capturedInput []byte
+		startFnCalled := make(chan struct{}, 1)

Then update the hook function:

				func(ctx *resolve.Context, input []byte) (bool, error) {
-					startFnCalled <- struct {
-						ctx   *resolve.Context
-						input []byte
-					}{ctx, input}
+					capturedCtx = ctx
+					capturedInput = input
+					startFnCalled <- struct{}{}
					return false, nil
				},

And the assertions:

-		called := <-startFnCalled
-		assert.Equal(t, ctx, called.ctx)
-		assert.Equal(t, []byte(`{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`), called.input)
+		<-startFnCalled
+		assert.Equal(t, ctx, capturedCtx)
+		assert.Equal(t, []byte(`{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`), capturedInput)

8927-8933: Add timeout safety to prevent hanging tests.

Consider adding a timeout to the channel receive operation to prevent tests from hanging if hooks fail to execute.

-		require.Len(t, startFnCalled, 1)
-		called := <-startFnCalled
+		require.Len(t, startFnCalled, 1)
+		select {
+		case called := <-startFnCalled:
+			// existing assertions
+		case <-time.After(time.Second):
+			t.Fatal("hook was not called within timeout")
+		}

8905-8953: Consider adding test cases for multiple hooks and close=true scenario.

The current tests provide good basic coverage, but consider adding these scenarios for more comprehensive testing:

  1. Multiple hooks execution: Test that all hooks are called in order and verify behavior when some succeed and others fail.

  2. Close=true scenario: Test the case where a hook returns (true, nil) to verify the close flag is properly propagated.

  3. Mixed scenarios: Test combinations of multiple hooks with different return values.

Example additional test case:

t.Run("OnSubscriptionStart with multiple hooks", func(t *testing.T) {
    var callOrder []int
    subscriptionSource := SubscriptionSource{
        onSubscriptionStartFns: []OnSubscriptionStartFn{
            func(ctx *resolve.Context, input []byte) (bool, error) {
                callOrder = append(callOrder, 1)
                return false, nil
            },
            func(ctx *resolve.Context, input []byte) (bool, error) {
                callOrder = append(callOrder, 2)
                return false, nil
            },
        },
    }
    
    close, err := subscriptionSource.OnSubscriptionStart(ctx, []byte(`{}`))
    require.NoError(t, err)
    assert.False(t, close)
    assert.Equal(t, []int{1, 2}, callOrder)
})
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f416831 and 3cd5690.

📒 Files selected for processing (7)
  • v2/pkg/engine/datasource/graphql_datasource/configuration.go (3 hunks)
  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (3 hunks)
  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go (2 hunks)
  • v2/pkg/engine/resolve/context.go (2 hunks)
  • v2/pkg/engine/resolve/datasource.go (1 hunks)
  • v2/pkg/engine/resolve/resolve.go (6 hunks)
  • v2/pkg/engine/resolve/resolve_test.go (19 hunks)
🧠 Learnings (3)
v2/pkg/engine/datasource/graphql_datasource/configuration.go (1)

Learnt from: SkArchon
PR: #1203
File: v2/pkg/engine/resolve/loader.go:63-67
Timestamp: 2025-07-02T15:28:02.122Z
Learning: In the graphql-go-tools codebase, result structs are consistently initialized with non-nil bytes.Buffer instances, making additional nil checks for res.out unnecessary defensive programming.

v2/pkg/engine/resolve/resolve_test.go (1)

Learnt from: SkArchon
PR: #1203
File: v2/pkg/engine/resolve/loader.go:63-67
Timestamp: 2025-07-02T15:28:02.122Z
Learning: In the graphql-go-tools codebase, result structs are consistently initialized with non-nil bytes.Buffer instances, making additional nil checks for res.out unnecessary defensive programming.

v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go (1)

Learnt from: SkArchon
PR: #1203
File: v2/pkg/engine/resolve/loader.go:63-67
Timestamp: 2025-07-02T15:28:02.122Z
Learning: In the graphql-go-tools codebase, result structs are consistently initialized with non-nil bytes.Buffer instances, making additional nil checks for res.out unnecessary defensive programming.

🧰 Additional context used
🧠 Learnings (3)
v2/pkg/engine/datasource/graphql_datasource/configuration.go (1)

Learnt from: SkArchon
PR: #1203
File: v2/pkg/engine/resolve/loader.go:63-67
Timestamp: 2025-07-02T15:28:02.122Z
Learning: In the graphql-go-tools codebase, result structs are consistently initialized with non-nil bytes.Buffer instances, making additional nil checks for res.out unnecessary defensive programming.

v2/pkg/engine/resolve/resolve_test.go (1)

Learnt from: SkArchon
PR: #1203
File: v2/pkg/engine/resolve/loader.go:63-67
Timestamp: 2025-07-02T15:28:02.122Z
Learning: In the graphql-go-tools codebase, result structs are consistently initialized with non-nil bytes.Buffer instances, making additional nil checks for res.out unnecessary defensive programming.

v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go (1)

Learnt from: SkArchon
PR: #1203
File: v2/pkg/engine/resolve/loader.go:63-67
Timestamp: 2025-07-02T15:28:02.122Z
Learning: In the graphql-go-tools codebase, result structs are consistently initialized with non-nil bytes.Buffer instances, making additional nil checks for res.out unnecessary defensive programming.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Build and test (go 1.23 / windows-latest)
🔇 Additional comments (20)
v2/pkg/engine/resolve/datasource.go (1)

30-41: Well-designed subscription hook interface.

The SubscriptionDataSourceHookable interface is cleanly designed with clear semantics. The method signature appropriately combines lifecycle control (close boolean) with error handling, and the comprehensive documentation explains all behaviors clearly.

v2/pkg/engine/resolve/context.go (3)

36-36: Good encapsulation of subscription updater functionality.

The private subscriptionUpdater field maintains proper encapsulation while providing the necessary functionality for subscription updates.


109-114: Well-documented setter with clear usage patterns.

The SetSubscriptionUpdater method provides a clean API for configuring the subscription updater callback, with excellent documentation covering various usage scenarios including testing and extensibility.


116-123: Safe and well-implemented subscription update emitter.

The EmitSubscriptionUpdate method correctly handles the nil case and provides a safe API for external code to emit subscription updates. The implementation is robust and well-documented.

v2/pkg/engine/datasource/graphql_datasource/configuration.go (3)

12-12: Necessary import addition for subscription hooks.

The resolve package import is correctly added to support the new OnSubscriptionStartFn type that uses *resolve.Context.


107-112: Excellent function type definition with comprehensive documentation.

The OnSubscriptionStartFn type alias provides a clean, reusable definition that matches the interface signature exactly. The documentation thoroughly explains all parameters, return values, and behaviors.


130-130: Well-designed configuration field for multiple subscription hooks.

The OnSubscriptionStartFns field appropriately uses a slice to support multiple hooks, providing flexibility in the subscription hook system while maintaining clean configuration structure.

v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go (1)

4022-4022: LGTM! Proper struct field initialization.

The explicit field assignment improves code clarity and correctness.

v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (3)

453-454: LGTM! Clean integration with configuration.

The initialization of onSubscriptionStartFns from the configuration follows the established pattern and correctly passes the hook functions to the subscription source.


1943-1944: LGTM! Proper struct field addition.

The onSubscriptionStartFns field is correctly typed and follows Go naming conventions. The placement within the struct is logical.


1995-2003: LGTM! Well-implemented hook execution pattern.

The OnSubscriptionStart method correctly implements the subscription hook interface:

  • Sequential execution of hooks with proper early termination on error/close
  • Clean error propagation and return value handling
  • Simple, readable implementation that follows Go conventions

The method integrates well with the subscription resolver logic mentioned in previous reviews.

v2/pkg/engine/resolve/resolve_test.go (4)

4790-4797: Well-designed API extension for subscription start hook testing.

The addition of the onSubscriptionStartFn parameter to createFakeStream is clean and follows the existing pattern for optional callbacks. The function signature appropriately matches the expected hook interface.


4811-4816: Clean implementation of the OnSubscriptionStart hook.

The method correctly handles the nil case and delegates to the configured function when available. The default return values (false, nil) are appropriate.


5447-5479: Comprehensive test for OnSubscriptionStart hook invocation.

The test effectively verifies that the hook is called during subscription resolution. Good use of channels for synchronization and proper timeout handling.


5481-5510: Excellent integration test for subscription updater in OnSubscriptionStart.

The test effectively verifies that:

  1. The context passed to the hook has a working subscription updater
  2. Messages emitted from the hook are delivered before regular stream messages
  3. The message ordering is preserved correctly

This is a valuable test for ensuring the hook can inject initial state or configuration messages.

v2/pkg/engine/resolve/resolve.go (5)

572-580: LGTM! Clean implementation of the subscription start hook.

The helper function properly checks for the interface implementation and delegates to the hook method with appropriate return values.


603-607: Good design for enabling external subscription updates.

The subscription updater callback provides a clean API for external code to emit updates while maintaining encapsulation of internal subscription handling logic.


622-628: Proper hook integration in the existing trigger path.

The hook is correctly invoked after adding the subscription to an existing trigger, with appropriate error handling and trigger closure logic based on the hook's return value.


665-672: Consistent hook implementation in the new trigger path.

The hook is properly invoked with the same error handling pattern. The early return when close=true correctly prevents starting the trigger goroutine.


792-830: Excellent refactoring of subscription update handling.

The extraction of handleTriggerUpdateSubscription improves code modularity and reusability. The method properly handles:

  • Client disconnection checks
  • Event filtering with error handling
  • Non-blocking work scheduling with context cancellation support

This refactoring enables the subscription updater callback to reuse the same update logic.

@alepane21 alepane21 marked this pull request as ready for review July 23, 2025 07:58
@alepane21 alepane21 requested a review from jensneuse July 23, 2025 08:03
@alepane21
Copy link
Copy Markdown
Contributor Author

@coderabbitai full review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Aug 21, 2025

✅ Actions performed

Full review triggered.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
v2/pkg/engine/datasource/graphql_datasource/configuration.go (1)

128-130: Document hook execution order and failure semantics.

Callers will benefit from knowing whether hooks run sequentially in declaration order, whether later hooks run if an earlier one returns an error, and whether they may run in parallel in the future.

Apply this diff:

-    // StartupHooks contains the method called when a subscription is started
-    StartupHooks []SubscriptionOnStartFn
+    // StartupHooks are invoked when a subscription is started.
+    // Execution: sequential, in declaration order. If any hook returns an error, remaining hooks are not executed and startup aborts.
+    // Hooks may call ctx.EmitSubscriptionUpdate to pre-inject messages before the subscription connection is established.
+    StartupHooks []SubscriptionOnStartFn
v2/pkg/engine/resolve/resolve_test.go (1)

4799-4800: Naming nit: consider a more descriptive return than “done”

Past feedback suggested renaming “done” to “accepted” for clarity. Here “done” seems to mean “the source has finished emitting.” Consider “finished” or “complete” to better communicate semantics.

🧹 Nitpick comments (19)
v2/pkg/engine/resolve/context.go (2)

109-116: Provide a generic alias EmitEvent to align with prior discussion and future-proof naming.

Past discussion mentioned adding ctx.EmitEvent(...). Keeping EmitSubscriptionUpdate is fine, but a generic alias improves API ergonomics and avoids over-scoping to subscriptions.

No behavior change; just a thin wrapper:

// add in the same file (outside the changed lines)
func (c *Context) EmitEvent(data []byte) {
	c.EmitSubscriptionUpdate(data)
}

Optional: If you want to signal whether an emission happened, add a bool return in the alias (and leave the current method as-is):

func (c *Context) TryEmitEvent(data []byte) bool {
	emitEventFn := c.emitEventFn
	if emitEventFn == nil {
		return false
	}
	emitEventFn(data)
	return true
}

219-219: Avoid calling Free() concurrently with event emission.

Resetting emitEventFn to nil in Free() is correct, but it must not race with EmitSubscriptionUpdate. If that invariant is upheld by the engine, consider adding a brief note to Free() or the field comment to make this explicit. Otherwise, a sync primitive would be required here (not recommended unless you want to widen Context’s concurrency guarantees).

v2/pkg/engine/resolve/datasource.go (1)

30-40: Clarify when the hook runs (post-dedup) and what payload format Emit should use.

To make implementers’ lives easier, please tighten the doc comment to explicitly state that:

  • The hook is invoked after request deduplication (mirroring Start’s contract).
  • Hook code may call ctx.EmitSubscriptionUpdate with GraphQL subscription envelopes (data/errors per spec), and that returning an error aborts startup.

Apply this diff to the comment block:

-// HookableSubscriptionDataSource is a hookable interface for subscription data sources.
-// It is used to call a function when a subscription is started.
-// This is useful for data sources that need to do some work when a subscription is started,
-// e.g. to establish a connection to the data source or to emit updates to the client.
-// The function is called with the context and the input of the subscription.
-// The function is called before the subscription is started and can be used to emit updates to the client.
+// HookableSubscriptionDataSource enables a pre-start hook for subscriptions.
+// The hook is invoked after request deduplication but before the subscription Start/AsyncStart.
+// Implementations can perform startup work and/or emit updates via ctx.EmitSubscriptionUpdate.
+// Emitted payloads should be valid GraphQL subscription envelopes (data/errors as applicable).
+// Returning a non-nil error aborts subscription startup and is propagated to the client.
 type HookableSubscriptionDataSource interface {
 	// SubscriptionOnStart is called when a new subscription is created
 	// If an error is returned, the error is propagated to the client.
 	SubscriptionOnStart(ctx *Context, input []byte) (err error)
 }
v2/pkg/engine/datasource/graphql_datasource/configuration.go (2)

12-12: Watch cross-package coupling: config -> resolve dependency.

Importing resolve from the datasource configuration package tightens layering. It’s acceptable here, but if you want to keep configuration decoupled from engine internals, consider defining a minimal interface in this package (e.g., an EventEmitter with EmitEvent([]byte)) and type the hook against that instead of *resolve.Context. The backing engine can pass a compatible adapter.


107-111: Strengthen the hook contract in docs (immutability and abort behavior).

Make it explicit that the input must be treated as read-only and that returning an error prevents the subscription from starting.

Apply this diff to the comment:

-// SubscriptionOnStartFn defines a hook function that is called when a subscription starts.
-// It receives the resolve context and the input of the subscription.
-// The function can return an error.
+// SubscriptionOnStartFn defines a hook invoked just before the subscription starts.
+// It receives the resolve Context and the subscription input (read-only).
+// The hook may emit zero or more updates via ctx.EmitSubscriptionUpdate.
+// If the hook returns a non-nil error, startup is aborted and the error is propagated to the client.
 type SubscriptionOnStartFn func(ctx *resolve.Context, input []byte) (err error)
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go (3)

4022-4022: Prefer a test-local HTTP client with a timeout over http.DefaultClient

Using http.DefaultClient can share global state and has no timeout by default, which risks test hangs. For determinism and isolation, use a local client with a finite timeout (also consistent with the pattern used in newSubscriptionSource further down).

Apply this diff:

-                       client: NewGraphQLSubscriptionClient(http.DefaultClient, http.DefaultClient, ctx),
+                       client: NewGraphQLSubscriptionClient(&http.Client{Timeout: 10 * time.Second}, http.DefaultClient, ctx),

8908-8910: Remove no-op defer; Done() doesn’t cancel context

defer ctx.Context().Done() is a no-op (Done() returns a channel; calling it doesn’t cancel anything). Either drop the line or create a cancellable parent context and pass it to resolve.NewContext if you intend to cancel.

Apply this diff to simply remove the no-op:

-        ctx := resolve.NewContext(context.Background())
-        defer ctx.Context().Done()
+        ctx := resolve.NewContext(context.Background())

8938-8954: Add ordering/short-circuit and empty-hooks tests for stronger guarantees

You already verify error propagation for a single hook. Two quick additions would lock in contract details:

  • All hooks run sequentially and the chain short-circuits on the first error.
  • No hooks (nil/empty slice) is a no-op and returns nil.

Apply this diff to extend the test function with two extra subtests:

   t.Run("SubscriptionOnStart calls subscriptionOnStartFns and returns error if one of the functions returns an error", func(t *testing.T) {
     ctx := resolve.NewContext(context.Background())
-    defer ctx.Context().Done()
+    defer ctx.Context().Done() // retained in this subtest for minimal change; you may remove as per earlier comment.

     subscriptionSource := SubscriptionSource{
       subscriptionOnStartFns: []SubscriptionOnStartFn{
         func(ctx *resolve.Context, input []byte) error {
           return errors.New("test error")
         },
       },
     }

     err := subscriptionSource.SubscriptionOnStart(ctx, []byte(`{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`))
     require.Error(t, err)
     assert.ErrorContains(t, err, "test error")
   })
+
+  t.Run("SubscriptionOnStart executes hooks sequentially and short-circuits on error", func(t *testing.T) {
+    ctx := resolve.NewContext(context.Background())
+    // no-op defer removed as discussed earlier
+
+    secondCalled := make(chan struct{}, 1)
+    subscriptionSource := SubscriptionSource{
+      subscriptionOnStartFns: []SubscriptionOnStartFn{
+        func(ctx *resolve.Context, input []byte) error {
+          return errors.New("boom")
+        },
+        func(ctx *resolve.Context, input []byte) error {
+          secondCalled <- struct{}{}
+          return nil
+        },
+      },
+    }
+
+    err := subscriptionSource.SubscriptionOnStart(ctx, []byte(`{"variables":{}}`))
+    require.Error(t, err)
+    // Ensure the second hook did not run after the first failed.
+    select {
+    case <-secondCalled:
+      t.Fatal("second hook should not be called after an error in the first hook")
+    default:
+    }
+  })
+
+  t.Run("SubscriptionOnStart with no hooks is a no-op", func(t *testing.T) {
+    ctx := resolve.NewContext(context.Background())
+    subscriptionSource := SubscriptionSource{} // no hooks
+    err := subscriptionSource.SubscriptionOnStart(ctx, []byte(`{"variables":{}}`))
+    require.NoError(t, err)
+  })
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (2)

1946-1948: Document concurrency/immutability guarantees for these fields.

subscriptionOnStartFns is read-only after construction, which is a useful guarantee for thread-safety. A short comment here will prevent future accidental mutation.

 type SubscriptionSource struct {
-	client                 GraphQLSubscriptionClient
-	subscriptionOnStartFns []SubscriptionOnStartFn
+	client                 GraphQLSubscriptionClient
+	// subscriptionOnStartFns are configured at planning time and must be treated as immutable
+	// during subscription execution to avoid races.
+	subscriptionOnStartFns []SubscriptionOnStartFn
 }

1998-2006: Harden hook execution: recover from panics to avoid taking down the process.

Hooks are user/module-provided; a panic here would crash the server. Wrap execution with a recover() to convert panics into errors and fail the subscription gracefully.

-func (s *SubscriptionSource) SubscriptionOnStart(ctx *resolve.Context, input []byte) (err error) {
-	for _, fn := range s.subscriptionOnStartFns {
+func (s *SubscriptionSource) SubscriptionOnStart(ctx *resolve.Context, input []byte) (err error) {
+	// Convert panics in hooks into returned errors to protect the engine.
+	defer func() {
+		if r := recover(); r != nil {
+			err = fmt.Errorf("subscription on-start hook panic: %v", r)
+		}
+	}()
+	for _, fn := range s.subscriptionOnStartFns {
 		err = fn(ctx, input)
 		if err != nil {
 			return err
 		}
 	}
 	return
 }

Optionally add a brief godoc on the method clarifying: sequential execution, stop-on-first-error, and that any error prevents the subscription from starting.

v2/pkg/engine/resolve/resolve.go (2)

633-634: Return value ignored in this path; consider renaming or removing the bool.

go startHook() runs asynchronously and the return value is dropped. Either:

  • rename to startHookAsync() and remove the bool return, or
  • handle the result (e.g., by signaling back via an event) if you need to act on it.

Minor clean-up for clarity.


1189-1256: Avoid broadcasting hook updates — add a targeted subscription event

Verified: subscriptionUpdater.Update currently sends subscriptionEventKindTriggerUpdate (v2/pkg/engine/resolve/resolve.go:1196–1208) and the event loop (handleTriggerUpdate at v2/pkg/engine/resolve/resolve.go:785–811) fans that update out to every subscription under the trigger. The subscriptionEvent struct already includes id SubscriptionIdentifier (v2/pkg/engine/resolve/resolve.go:1258–1261), so the shape needed for a targeted event exists.

What to change (minimal, safe refactor):

  • Add a new event kind next to the existing constants:
    • v2/pkg/engine/resolve/resolve.go (around 1276): add subscriptionEventKindTargetedUpdate.
  • Add a targeted-emission API (one of these options):
    • Add a small method on subscriptionUpdater:
      • func (s *subscriptionUpdater) UpdateTo(id SubscriptionIdentifier, data []byte) { ... }
        — which sends subscriptionEvent{ triggerID: s.triggerID, id: id, kind: subscriptionEventKindTargetedUpdate, data: data }.
    • OR add Context.EmitSubscriptionUpdateToSelf(data) (filled by per-sub hook wiring) that emits a subscriptionEvent with kind subscriptionEventKindTargetedUpdate and the target id.
  • Route targeted events in the single-threaded event loop (preserves ordering):
    • In the event switch (v2/pkg/engine/resolve/resolve.go: around handleEvent / handleTriggerUpdate), add handling for subscriptionEventKindTargetedUpdate that:
      • looks up the trigger (r.triggers[event.triggerID]),
      • looks up the specific subscription by event.id (SubscriptionIdentifier),
      • enqueues the same workItem{fn, false} for only that subscription’s worker (instead of iterating all subs).
    • Keep existing subscriptionEventKindTriggerUpdate behavior for backward compatibility.
  • Tests:
    • Add a unit test under v2/pkg/engine/resolve/* (resolve_test.go area) that asserts EmitSubscriptionUpdateToSelf (or UpdateTo) sends the message only to the intended subscription and not to others. Update/extend existing hook tests (they already use ctx.EmitSubscriptionUpdate) to cover both broadcast and targeted cases.

Why this is safe and useful:

  • subscriptionEvent already contains SubscriptionIdentifier, so no structural change is required.
  • The event still flows through the single-threaded event loop (ordering preserved) but avoids per-sub filter invocation and accidental delivery to unrelated subscriptions.
  • Keeps backward compatibility: existing broadcasts continue to work.

If you want I can:

  • Produce the precise diff for these edits (const + new method + event-loop case + test stub).
v2/pkg/engine/resolve/resolve_test.go (7)

4790-4796: Test helper signature grew; consider options to avoid future churn

Adding the 4th parameter solved the immediate need, but it forced touching many callsites. To limit future churn if we add more toggles (e.g., more hooks), consider switching to functional options or a small options struct so callsites remain stable.


4812-4817: No-op when hook is nil is correct; add a short doc comment for call order

A tiny comment clarifying that this is invoked by the resolver before Start would aid future readers.

Apply this diff to add a brief comment:

+// SubscriptionOnStart is invoked by the resolver before the stream Start().
+// It lets tests simulate subscription startup hooks and optionally emit pre-start updates.
 func (f *_fakeStream) SubscriptionOnStart(ctx *Context, input []byte) (err error) {

5452-5484: Strengthen the hook invocation test by asserting the hook sees the expected input

You already assert the request in onStart. Also verify the exact input received by the hook closure to catch any wiring regressions.

Apply this diff inside the hook closure:

-        }, func(ctx *Context, input []byte) (err error) {
-            called <- true
-            return nil
-        })
+        }, func(ctx *Context, input []byte) (err error) {
+            expected := `{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`
+            require.Equal(t, expected, string(input))
+            called <- true
+            return nil
+        })

5486-5515: Avoid flakiness: disable heartbeats when asserting exact message counts

This test asserts an exact count of 2 messages. Heartbeats are time-based and can introduce spurious messages under load. Disabling them here makes the test robust.

         ctx := &Context{
             ctx: context.Background(),
             ExecutionOptions: ExecutionOptions{
-                SendHeartbeat: true,
+                SendHeartbeat: false,
             },
         }

5517-5551: Same here: turn off heartbeats when asserting a precise number of updates

You assert workChanBufferSize+2 messages. Any heartbeat could break the equality. Turn them off for this test too.

         ctx := &Context{
             ctx: context.Background(),
             ExecutionOptions: ExecutionOptions{
-                SendHeartbeat: true,
+                SendHeartbeat: false,
             },
         }

5554-5623: Remove unused variables and simplify expectations in mixed-source test

  • messagesDroppedFromHook is never incremented and always 0.
  • hookCompleted is written to but never read; dead code and potential goroutine leak on channel writes if a future refactor blocks.

Cleaning these up reduces noise and makes the test intent clearer.

Apply this diff:

-        messagesDroppedFromHook := &atomic.Int32{}
         messagesToSendFromOtherSources := int32(100)
@@
-        firstMessageArrived := make(chan bool, 1)
-        hookCompleted := make(chan bool, 1)
+        firstMessageArrived := make(chan bool, 1)
@@
-            if counter == int(messagesToSendFromOtherSources)-1 {
-                select {
-                case hookCompleted <- true:
-                case <-time.After(defaultTimeout):
-                }
-            }
             return fmt.Sprintf(`{"data":{"counter":%d}}`, counter), counter == int(messagesToSendFromOtherSources)-1
@@
-            go func() {
+            go func() {
                 // Wait for the first message to arrive before sending updates
                 select {
                 case <-firstMessageArrived:
                     for i := 1; i < int(messagesToSendFromHook); i++ {
                         ctx.EmitSubscriptionUpdate([]byte(fmt.Sprintf(`{"data":{"counter":%d}}`, i+20000)))
                     }
-                    hookCompleted <- true
                 case <-time.After(defaultTimeout):
                     // if the first message did not arrive, do not send any updates
                     return
                 }
             }()
@@
-        assert.Equal(t, int32(messagesToSendFromHook+messagesToSendFromOtherSources-messagesDroppedFromHook.Load()+messagesHeartbeat), int32(len(recorder.Messages())))
+        assert.Equal(t, int32(messagesToSendFromHook+messagesToSendFromOtherSources+messagesHeartbeat), int32(len(recorder.Messages())))

5452-5673: Add a negative-path test: hook error prevents subscription start

We should cover the case where the hook returns an error: no upstream messages should flow, an error should be written once, and the subscription should complete/close. This guards against regressions in error propagation order.

Proposed test snippet:

t.Run("SubscriptionOnStart error aborts subscription before start", func(t *testing.T) {
    c, cancel := context.WithCancel(context.Background())
    defer cancel()

    fakeStream := createFakeStream(
        func(counter int) (string, bool) {
            // Should never be called if the hook aborts correctly
            return `{"data":{"counter":999}}`, true
        },
        1*time.Millisecond,
        func(input []byte) {
            assert.Equal(t, `{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`, string(input))
        },
        func(ctx *Context, input []byte) error {
            return errors.New("hook failed")
        },
    )

    resolver, plan, recorder, id := setup(c, fakeStream)
    ctx := &Context{ctx: context.Background()}

    err := resolver.AsyncResolveGraphQLSubscription(ctx, plan, recorder, id)
    // Resolver should not fail starting the routine; error should be surfaced as a message
    assert.NoError(t, err)

    recorder.AwaitComplete(t, defaultTimeout)
    msgs := recorder.Messages()
    require.Len(t, msgs, 1)
    assert.JSONEq(t, `{"errors":[{"message":"hook failed"}],"data":null}`, msgs[0])
})

If you want, I can wire this in and adjust expectations based on your resolver’s exact error shape.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 16cb6de and 447c103.

📒 Files selected for processing (7)
  • v2/pkg/engine/datasource/graphql_datasource/configuration.go (3 hunks)
  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (3 hunks)
  • v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go (2 hunks)
  • v2/pkg/engine/resolve/context.go (3 hunks)
  • v2/pkg/engine/resolve/datasource.go (1 hunks)
  • v2/pkg/engine/resolve/resolve.go (4 hunks)
  • v2/pkg/engine/resolve/resolve_test.go (20 hunks)
🔇 Additional comments (10)
v2/pkg/engine/resolve/context.go (1)

36-37: Document emitEventFn Concurrency Contract

The emitEventFn callback is only ever set once on each cloned Context—before invoking SubscriptionOnStart or Start—and thereafter is read by EmitSubscriptionUpdate. It must not be reassigned or cleared concurrently while events are being emitted (or while Free is called).

Please add this comment immediately above the field in v2/pkg/engine/resolve/context.go to codify the intended lifecycle:

--- a/v2/pkg/engine/resolve/context.go
+++ b/v2/pkg/engine/resolve/context.go
@@ type Context struct {
-   emitEventFn      func(data []byte)
+   // emitEventFn is the callback installed by the subscription engine
+   // to deliver updates to the client. It is assigned exactly once
+   // on each cloned Context—prior to calling SubscriptionOnStart or
+   // Start—and must not be mutated concurrently with
+   // EmitSubscriptionUpdate or Free.
+   emitEventFn      func(data []byte)
v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go (1)

8916-8936: Good: validates ctx and input are passed to hook

This subtest precisely checks both context wiring and input propagation to the hook. Clear and robust.

v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go (1)

453-455: LGTM: hooks are correctly wired into the SubscriptionSource instance.

Passing StartupHooks via subscriptionOnStartFns keeps configuration localized to the data source. Looks good.

v2/pkg/engine/resolve/resolve.go (3)

593-600: Good: detach and use a trigger-scoped cancelable context + updater.

Detaching from the request context and scoping cancellation to the trigger avoids coupling subscription lifetime to the HTTP request. The updater design cleanly feeds back into the event loop.


667-671: LGTM: synchronous gating of source start by the hook.

Starting the data source only if the hook succeeds prevents wasted connections and early events. Good sequencing.


1259-1265: Struct field reordering only; no functional impact detected.

Event struct remains compatible with existing switch handling. No concerns.

v2/pkg/engine/resolve/resolve_test.go (4)

4803-4810: Nice: injectable UniqueRequestID and onStart/hook fields improve test control

Allowing a custom uniqueRequestFn and the new hook field makes tests deterministic without coupling to internal hashing logic. LGTM.


4834-4837: Early exit for custom UniqueRequestID keeps tests deterministic

The override guard makes the hashing stable when tests need to force a specific key. Looks good.


5077-5082: Callsite update for createFakeStream hook param looks correct

Passing nil for the new subscriptionOnStartFn parameter keeps old behavior intact in tests that aren’t exercising hooks. No concerns here.


5625-5673: Two independent subscriptions to the same trigger verified end-to-end

Good coverage: exercising distinct connection/subscription IDs and the custom UniqueRequestID to validate isolation. LGTM.

Comment thread v2/pkg/engine/resolve/resolve.go Outdated
@dkorittki dkorittki requested a review from a team as a code owner September 30, 2025 14:15
@dkorittki dkorittki requested review from StarpTech and removed request for a team September 30, 2025 14:15
@dkorittki dkorittki force-pushed the ale/eng-7600-add-subscriptiononstarthandler branch from 4e9886b to a0024f0 Compare September 30, 2025 14:26
@dkorittki
Copy link
Copy Markdown
Contributor

dkorittki commented Sep 30, 2025

fyi I merged master branch in here to get upstream changes but rather should have updated topic/streams-v1 branch first, then this one from there. I undid this by removing the merge commit with all its related commits, and forced push this. That brings us back to where we were before I added the merge commit.

@dkorittki
Copy link
Copy Markdown
Contributor

Closed, as we do the actual review when merging branch topic/streams-v1 into master branch

@dkorittki dkorittki closed this Oct 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants